/*
 * Copyright 2021 Huawei Technologies Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package internal

import (
	"fmt"
	"httpExporter/src/model"

	"os"

	MQTT "github.com/eclipse/paho.mqtt.golang"
	log "github.com/sirupsen/logrus"
)

var NodeInfo []model.Node
var MqInfo model.MessageQueue

type readings []map[string]interface{}

func getDataFromMqtt(url string, topic string) error {

	host := MqInfo.HostName
	port := MqInfo.Port
	qos := 0

	if host == "" || port == 0 {
		log.Error("Broker host or port is not configured")
		return fmt.Errorf("Get data Failed from MQ")
	}

	opts := MQTT.NewClientOptions()
	fmt.Println("Broker Host", fmt.Sprintf("tcp://%s:%d", host, port))

	if host == "rabbitmq" {
		pw := os.Getenv("RABBITMQ_PASSWORD")
		user := "admin"

		opts.SetUsername(user)
		opts.SetPassword(pw)
	}

	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
	choke := make(chan []byte)

	//opts.SetDefaultPublishHandler(messagePubHandler)
	opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
		//fmt.Println("mqtt msg:", msg)
		choke <- msg.Payload()
	})

	client := MQTT.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		log.Error(token.Error(), "mqtt connect failed")
		return token.Error()
	}

	if token := client.Subscribe(topic, byte(qos), nil); token.Wait() && token.Error() != nil {
		log.Error(token.Error(), "mqtt subscribe failed")
		return token.Error()
	}

	err := sendToHttp(url, topic ,choke)
	if err != nil {
		return err
	}

	defer client.Disconnect(250)
	fmt.Println("Subscriber Disconnected")
	return nil
}
